1
2
3
4
5
6
7 package ca.uhn.cache.util;
8
9 import java.util.ArrayList;
10 import java.util.Collection;
11 import java.util.Iterator;
12
13 import EDU.oswego.cs.dl.util.concurrent.Executor;
14 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
15
16 import ca.uhn.cache.util.exception.MutableIteratorException;
17
18 /***
19 * Default implementation of IMutableMergeableIterator.
20 *
21 * @author <a href="mailto:bryan.tripp@uhn.on.ca">Bryan Tripp</a>
22 * @version $Revision: 1.1 $ updated on $Date: 2005/01/26 00:25:50 $ by $Author: bryan_tripp $
23 */
24 public class MutableMergeableIterator extends MutableIterator implements
25 IMutableMergeableIterator {
26
27 private Collection myMergedIterators;
28 private int myInProgressMerges = 0;
29 private Executor myExecutor;
30
31 /***
32 * @param theExecutor to manage threading
33 */
34 public MutableMergeableIterator(Executor theExecutor) {
35 myMergedIterators = new ArrayList(10);
36 myExecutor = theExecutor;
37 }
38
39 /***
40 * New instance with default Executor.
41 */
42 public MutableMergeableIterator() {
43 myMergedIterators = new ArrayList(10);
44 myExecutor = new ThreadedExecutor();
45 }
46
47 /***
48 * @see ca.uhn.cache.util.IMutableMergeableIterator#merge(java.util.Iterator)
49 */
50 public void merge(final Iterator theIterator) {
51 myInProgressMerges++;
52
53 if (theIterator instanceof IMutableIterator) {
54 myMergedIterators.add(theIterator);
55 }
56
57 Runnable merger = new Runnable() {
58 public void run() {
59 doMerge(theIterator);
60 }
61 };
62
63 thread(merger);
64 }
65
66 private void doMerge(Iterator theIterator) {
67 try {
68 while (theIterator.hasNext()) {
69 add(theIterator.next());
70 }
71 } catch (MutableIteratorException e) {
72 declareException(e);
73 }
74
75 myInProgressMerges--;
76 }
77
78 /***
79 * Waits until all in-progress merges are complete, then calls super.freeze(). Note
80 * that it's possible to continue calling add() in the mean time ... but don't,
81 * because you never know when the merges will finish.
82 */
83 public void freeze() {
84
85
86 Runnable freezer = new Runnable() {
87 public void run() {
88 doFreeze();
89 }
90 };
91
92 thread(freezer);
93 }
94
95 private void doFreeze() {
96
97
98 try {
99 while (myInProgressMerges > 0) {
100 Thread.sleep(2);
101 }
102 } catch (InterruptedException e) {
103
104 }
105
106 super.freeze();
107 }
108
109 /***
110 * Closes merged IMutableIterators. Override if you need to do more here. But don't
111 * forget to call super.close().
112 *
113 * @see ca.uhn.cache.IMutableIterator#close()
114 */
115 public void close() {
116 for (Iterator it = myMergedIterators.iterator(); it.hasNext(); ) {
117 ((IMutableIterator) it.next()).close();
118 }
119 }
120
121 private void thread(Runnable theTask) {
122 try {
123 myExecutor.execute(theTask);
124 } catch (InterruptedException e) {
125 declareException(e);
126 }
127 }
128
129 }